#!/usr/bin/env python3


import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
import random
import string
import json

cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True)
node2 = cluster.add_instance("node2", with_zookeeper=True)


@pytest.fixture(scope="module")
def started_cluster():
    try:
        cluster.start()

        yield cluster

    finally:
        cluster.shutdown()


def get_random_string(length):
    return "".join(
        random.choice(string.ascii_uppercase + string.digits) for _ in range(length)
    )


def test_system_replicated_fetches(started_cluster):
    node1.query(
        "CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '1') ORDER BY tuple()"
    )
    node2.query(
        "CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '2') ORDER BY tuple()"
    )

    with PartitionManager() as pm:
        node2.query("SYSTEM STOP FETCHES t")
        node1.query(
            "INSERT INTO t SELECT number, '{}' FROM numbers(10000)".format(
                get_random_string(104857)
            )
        )
        pm.add_network_delay(node1, 80)
        node2.query("SYSTEM START FETCHES t")
        fetches_result = []
        for _ in range(1000):
            result = json.loads(
                node2.query("SELECT * FROM system.replicated_fetches FORMAT JSON")
            )
            if not result["data"]:
                if fetches_result:
                    break
                time.sleep(0.1)
            else:
                fetches_result.append(result["data"][0])
                print(fetches_result[-1])
                time.sleep(0.1)

    node2.query("SYSTEM SYNC REPLICA t", timeout=10)
    assert node2.query("SELECT COUNT() FROM t") == "10000\n"

    for elem in fetches_result:
        elem["bytes_read_compressed"] = float(elem["bytes_read_compressed"])
        elem["total_size_bytes_compressed"] = float(elem["total_size_bytes_compressed"])
        elem["progress"] = float(elem["progress"])
        elem["elapsed"] = float(elem["elapsed"])

    assert len(fetches_result) > 0
    first_non_empty = fetches_result[0]

    assert first_non_empty["database"] == "default"
    assert first_non_empty["table"] == "t"
    assert first_non_empty["source_replica_hostname"] == "node1"
    assert first_non_empty["source_replica_port"] == 9009
    assert first_non_empty["source_replica_path"] == "/clickhouse/test/t/replicas/1"
    assert first_non_empty["interserver_scheme"] == "http"
    assert first_non_empty["result_part_name"] == "all_0_0_0"
    assert first_non_empty["result_part_path"].startswith("/var/lib/clickhouse/")
    assert first_non_empty["result_part_path"].endswith("all_0_0_0/")
    assert first_non_empty["partition_id"] == "all"
    assert first_non_empty["URI"].startswith(
        "http://node1:9009/?endpoint=DataPartsExchange"
    )

    for elem in fetches_result:
        # FIXME https://github.com/ClickHouse/ClickHouse/issues/45435
        # assert (
        #     elem["bytes_read_compressed"] <= elem["total_size_bytes_compressed"]
        # ), "Bytes read ({}) more than total bytes ({}). It's a bug".format(
        #     elem["bytes_read_compressed"], elem["total_size_bytes_compressed"]
        # )
        # assert (
        #     0.0 <= elem["progress"] <= 1.0
        # ), "Progress shouldn't less than 0 and bigger than 1, got {}".format(
        #     elem["progress"]
        # )
        assert (
            0.0 <= elem["elapsed"]
        ), "Elapsed time must be greater than 0, got {}".format(elem["elapsed"])

    prev_progress = first_non_empty["progress"]
    for elem in fetches_result:
        assert (
            elem["progress"] >= prev_progress
        ), "Progress decreasing prev{}, next {}? It's a bug".format(
            prev_progress, elem["progress"]
        )
        prev_progress = elem["progress"]

    prev_bytes = first_non_empty["bytes_read_compressed"]
    for elem in fetches_result:
        assert (
            elem["bytes_read_compressed"] >= prev_bytes
        ), "Bytes read decreasing prev {}, next {}? It's a bug".format(
            prev_bytes, elem["bytes_read_compressed"]
        )
        prev_bytes = elem["bytes_read_compressed"]

    prev_elapsed = first_non_empty["elapsed"]
    for elem in fetches_result:
        assert (
            elem["elapsed"] >= prev_elapsed
        ), "Elapsed time decreasing prev {}, next {}? It's a bug".format(
            prev_elapsed, elem["elapsed"]
        )
        prev_elapsed = elem["elapsed"]

    node1.query("DROP TABLE IF EXISTS t SYNC")
    node2.query("DROP TABLE IF EXISTS t SYNC")
